package com.atguigu.flinkSql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author wky
 * @create 2021-07-21-9:21
 */

//sql 事件时间作为watermark
public class Flink13_Sql_EventTime {
    public static void main(String[] args) {
        //创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //TODO 创建表执行环境
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        //TODO 在sql语句中指定 事件时间 1970开始计算   1970-01-01T08:00:01
        //注意不要多, 号 也不要多 '
        tableEnvironment.executeSql("create table sensor (" +
                "id string, " +
                "ts bigint," +
                " vc int," +
                " t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for t as t - interval '5' second) " +
                "with(" +
                "'connector' = 'filesystem'," +
                "'path' = 'src/input/sensor_sql.txt'," +
                "'format'='csv'" +
                ")");
        tableEnvironment.executeSql("select * from sensor").print();


    }
}
